---
title: "Kastrax AI Workflow System | Workflows | Kastrax"
description: "The Kastrax Workflow System provides a powerful framework for orchestrating complex AI agent operations with features like branching, parallel execution, resource suspension, and more."
---

# Kastrax AI Workflow System

The Kastrax Workflow System provides a powerful framework for orchestrating complex AI agent operations. Built on Kotlin's concurrency model, it enables you to define sophisticated workflows with features like branching, parallel execution, resource suspension, and more. This system is designed to handle the complex, multi-step processes that are common in AI applications.

## When to Use Workflows

Workflows are essential when your AI application requires orchestrating multiple operations in a structured manner. Consider using workflows when:

- **Complex Multi-step Processes**: Your application needs to execute a sequence of AI operations with dependencies between steps
- **Conditional Logic**: You need to implement branching paths based on intermediate results
- **Parallel Processing**: Multiple operations need to be executed concurrently
- **User Interaction**: Workflows need to pause and wait for user input before continuing
- **Error Handling**: You need sophisticated error recovery mechanisms for AI operations
- **Observability**: You want detailed tracking and monitoring of each step in your AI process

Kastrax's workflow system provides:

- A type-safe, Kotlin-based DSL for defining workflows and their steps
- Support for both simple (linear) and advanced (branching, parallel) execution paths
- Variable passing between steps with powerful reference resolution
- Suspension and resumption capabilities for long-running or user-interactive workflows
- Comprehensive error handling and recovery mechanisms
- Detailed observability and debugging features to track each workflow run

## Workflow Architecture

The Kastrax Workflow System is built on a modular architecture with several key components:

1. **Workflow Interface**: Defines the core operations for workflow execution
2. **Workflow Steps**: Individual units of work that can be combined into a workflow
3. **Workflow Context**: Maintains state and data during workflow execution
4. **Variable References**: Mechanism for passing data between steps
5. **Workflow Engine**: Orchestrates the execution of steps according to defined dependencies

This architecture enables flexible, maintainable, and observable workflow definitions.

## Example Workflow

Let's examine a simple workflow that processes content creation with research, writing, and editing steps:

### Creating a Basic Workflow

Here's how to define a workflow using Kastrax's Kotlin DSL:

```kotlin filename="ContentCreationWorkflow.kt"
import ai.kastrax.core.agent.Agent
import ai.kastrax.core.workflow.workflow
import ai.kastrax.core.workflow.variable

// Assume we have these agents defined elsewhere
val researchAgent: Agent = /* ... */
val writingAgent: Agent = /* ... */
val editingAgent: Agent = /* ... */

// Create the workflow
val contentWorkflow = workflow {
    name = "content-creation"
    description = "Create content about a topic"

    // Research step
    step(researchAgent) {
        id = "research"
        name = "Research"
        description = "Research the topic"
        variables = mutableMapOf(
            "topic" to variable("$.input.topic")
        )
    }

    // Writing step (depends on research)
    step(writingAgent) {
        id = "writing"
        name = "Writing"
        description = "Write an article based on research"
        after("research")  // This step runs after the research step
        variables = mutableMapOf(
            "research" to variable("$.steps.research.output.text")
        )
    }

    // Editing step (depends on writing)
    step(editingAgent) {
        id = "editing"
        name = "Editing"
        description = "Edit the article"
        after("writing")  // This step runs after the writing step
        variables = mutableMapOf(
            "draft" to variable("$.steps.writing.output.text")
        )
    }

    // Define output mapping
    output {
        "researchSummary" from "$.steps.research.output.text"
        "finalArticle" from "$.steps.editing.output.text"
        "wordCount" from {
            "$.steps.editing.output.text" to { text ->
                (text as? String)?.split(" ")?.size ?: 0
            }
        }
    }
}
```

This workflow defines three steps (research, writing, and editing) with dependencies between them, and maps the final outputs.

### Registering the Workflow

Register your workflow with the Kastrax system to enable execution, logging, and telemetry:

```kotlin filename="KastraxSetup.kt"
import ai.kastrax.core.KastraxSystem

// Create a Kastrax system instance
val kastraxSystem = KastraxSystem()

// Register the workflow
kastraxSystem.registerWorkflow(contentWorkflow)

// You can also register multiple workflows
kastraxSystem.registerWorkflows(
    contentWorkflow,
    otherWorkflow,
    anotherWorkflow
)
```

You can also create dynamic workflows at runtime using the workflow engine:

```kotlin filename="DynamicWorkflowCreation.kt"
import ai.kastrax.core.workflow.dynamic.DynamicWorkflowGenerator

// Create a dynamic workflow generator
val workflowGenerator = DynamicWorkflowGenerator(kastraxSystem)

// Create a workflow dynamically
val dynamicWorkflow = workflowGenerator.createWorkflow(
    workflowName = "dynamic-workflow",
    description = "Dynamically created workflow"
) {
    // Define steps and flow here
    step(someAgent) {
        id = "dynamic-step"
        // ... step configuration
    }

    // ... more steps and configuration
}

// Register the dynamic workflow
kastraxSystem.registerWorkflow(dynamicWorkflow)
```

### Executing the Workflow

Execute your workflow programmatically:

```kotlin filename="ExecuteWorkflow.kt"
import ai.kastrax.core.workflow.WorkflowExecuteOptions
import kotlinx.coroutines.runBlocking

fun main() = runBlocking {
    // Get the workflow engine
    val workflowEngine = kastraxSystem.workflowEngine

    // Execute the workflow
    val result = workflowEngine.executeWorkflow(
        workflowId = "content-creation",
        input = mapOf("topic" to "Artificial Intelligence in Healthcare"),
        options = WorkflowExecuteOptions(
            maxSteps = 10,  // Maximum number of steps to execute
            timeout = 60000, // Timeout in milliseconds
            onStepFinish = { stepResult ->
                println("Step ${stepResult.stepId} completed")
            },
            onStepError = { stepId, error ->
                println("Error in step $stepId: ${error.message}")
            }
        )
    )

    // Process the result
    if (result.success) {
        println("Workflow completed successfully")
        println("Final article: ${result.output["finalArticle"]}")
        println("Word count: ${result.output["wordCount"]}")
    } else {
        println("Workflow failed: ${result.error}")
    }
}
```

You can also execute workflows via the Kastrax HTTP API (when running a Kastrax server):

```bash
curl --location 'http://localhost:8080/api/workflows/content-creation/execute' \
     --header 'Content-Type: application/json' \
     --data '{
       "topic": "Artificial Intelligence in Healthcare"
     }'
```

This example demonstrates the essentials: define your workflow with steps, register it with the Kastrax system, then execute it with input data.

## Workflow Step Types

Kastrax supports various types of workflow steps to handle different scenarios:

### Agent Steps

Agent steps use AI agents to perform tasks:

```kotlin
step(agent) {
    id = "agent-step"
    name = "Agent Processing"
    description = "Use an agent to process data"
    variables = mutableMapOf(
        "input" to variable("$.input.data")
    )
}
```

### Tool Steps

Tool steps execute specific tools or functions:

```kotlin
toolStep {
    id = "data-processing"
    name = "Process Data"
    description = "Process data using a specific tool"
    tool = DataProcessingTool()
    variables = mutableMapOf(
        "data" to variable("$.input.rawData")
    )
}
```

### Conditional Steps

Conditional steps execute based on conditions:

```kotlin
conditionalStep {
    id = "quality-check"
    name = "Quality Check"
    description = "Check if the content meets quality standards"

    condition { context ->
        val wordCount = context.getVariable("$.steps.writing.output.wordCount") as? Int ?: 0
        wordCount > 500
    }

    onTrue {
        step(qualityAgent) {
            id = "quality-review"
            // ... configuration
        }
    }

    onFalse {
        step(revisionAgent) {
            id = "content-revision"
            // ... configuration
        }
    }
}
```

### Human-in-the-Loop Steps

Steps that require human input or approval:

```kotlin
humanStep {
    id = "human-review"
    name = "Human Review"
    description = "Get human approval for the content"

    prompt { context ->
        val content = context.getVariable("$.steps.editing.output.text") as? String ?: ""
        "Please review the following content:\n\n$content\n\nApprove or suggest changes."
    }

    timeoutMs = 86400000  // 24 hours in milliseconds

    onTimeout {
        // Handle timeout case
        step(notificationAgent) {
            id = "timeout-notification"
            // ... configuration
        }
    }
}
```

## Advanced Workflow Features

### Parallel Execution

Execute multiple steps concurrently:

```kotlin
parallelSteps {
    step(researchAgent1) {
        id = "research-source1"
        // ... configuration
    }

    step(researchAgent2) {
        id = "research-source2"
        // ... configuration
    }

    step(researchAgent3) {
        id = "research-source3"
        // ... configuration
    }
}

// Continue with a step that uses results from all parallel steps
step(synthesisAgent) {
    id = "research-synthesis"
    after("research-source1", "research-source2", "research-source3")
    variables = mutableMapOf(
        "source1" to variable("$.steps.research-source1.output.text"),
        "source2" to variable("$.steps.research-source2.output.text"),
        "source3" to variable("$.steps.research-source3.output.text")
    )
}
```

### Error Handling

Implement sophisticated error handling:

```kotlin
step(riskAgent) {
    id = "risk-analysis"
    // ... configuration

    onError { error ->
        // Log the error
        println("Error in risk analysis: ${error.message}")

        // Execute fallback step
        step(fallbackAgent) {
            id = "risk-analysis-fallback"
            variables = mutableMapOf(
                "error" to variable(error.message)
            )
        }
    }
}
```

### Workflow Variables

Workflow variables provide a powerful mechanism for passing data between steps:

```kotlin
// Define a workflow with variables
val workflowWithVariables = workflow {
    name = "data-processing-workflow"

    // Define input variables
    input {
        variable("dataSource", String::class)
        variable("processingLevel", Int::class, defaultValue = 1)
    }

    // Use variables in steps
    step(dataAgent) {
        id = "data-extraction"
        variables = mutableMapOf(
            "source" to variable("$.input.dataSource"),
            "level" to variable("$.input.processingLevel")
        )
    }

    // Use JSONPath expressions to access nested properties
    step(analysisAgent) {
        id = "data-analysis"
        after("data-extraction")
        variables = mutableMapOf(
            "rawData" to variable("$.steps.data-extraction.output.data"),
            "metadata" to variable("$.steps.data-extraction.output.metadata.timestamp")
        )
    }
}
```

### Observability and Monitoring

Kastrax workflows provide comprehensive observability features:

```kotlin
// Configure workflow monitoring
val monitoredWorkflow = workflow {
    name = "monitored-workflow"
    // ... workflow configuration

    monitoring {
        onStart { workflowId, input ->
            println("Workflow $workflowId started with input: $input")
        }

        onStepStart { workflowId, stepId ->
            println("Step $stepId in workflow $workflowId started")
        }

        onStepComplete { workflowId, stepId, output ->
            println("Step $stepId in workflow $workflowId completed with output: $output")
        }

        onComplete { workflowId, output ->
            println("Workflow $workflowId completed with output: $output")
        }

        onError { workflowId, error ->
            println("Workflow $workflowId failed with error: ${error.message}")
        }
    }
}
```

## Integration with Actor Model

One of Kastrax's unique features is the integration of the workflow system with the actor model, enabling distributed workflow execution:

```kotlin
import ai.kastrax.actor.ActorSystem
import ai.kastrax.actor.Props
import ai.kastrax.workflow.WorkflowActor
import ai.kastrax.workflow.messages.*
import kotlinx.coroutines.runBlocking

fun main() = runBlocking {
    // Create an actor system
    val system = ActorSystem("workflow-system")

    // Create a workflow actor
    val workflowActor = system.actorOf(
        Props.create(WorkflowActor::class.java, contentWorkflow),
        "workflow-actor"
    )

    // Send a workflow execution message
    val result = system.ask<WorkflowResult>(
        workflowActor,
        ExecuteWorkflowMessage(input = mapOf("topic" to "AI in Healthcare"))
    )

    // Process the result
    when (result) {
        is WorkflowResult.Success -> {
            println("Workflow executed successfully")
            println("Final article: ${result.output["finalArticle"]}")
        }
        is WorkflowResult.Error -> {
            println("Workflow execution failed: ${result.error}")
        }
    }

    // Shutdown the actor system when done
    system.terminate()
}
```

This integration enables building sophisticated multi-agent systems where workflows can be distributed across different nodes and executed concurrently.

## Real-World Use Cases

Kastrax workflows are ideal for a variety of AI agent applications:

### Content Creation Pipeline

Orchestrate a complete content creation process:
- Research step gathers information from multiple sources
- Planning step creates an outline based on research
- Writing step generates the initial content
- Editing step refines and improves the content
- Review step (human-in-the-loop) gets final approval

### Customer Support Automation

Handle complex customer inquiries:
- Classification step determines the type of inquiry
- Information retrieval step gathers relevant knowledge base articles
- Response generation step creates a personalized response
- Escalation step (conditional) routes complex issues to human agents

### Data Analysis Workflow

Process and analyze large datasets:
- Data collection step gathers information from multiple sources
- Preprocessing step cleans and normalizes the data
- Analysis step extracts insights from the data
- Visualization step creates charts and graphs
- Reporting step generates a comprehensive report

## More Resources

- The [Workflow Guide](../guides/ai-recruiter.mdx) in the Guides section is a tutorial that covers the main concepts
- [Sequential Steps workflow example](../../examples/workflows/sequential-steps.mdx)
- [Parallel Steps workflow example](../../examples/workflows/parallel-steps.mdx)
- [Branching Paths workflow example](../../examples/workflows/branching-paths.mdx)
- [Workflow Variables example](../../examples/workflows/workflow-variables.mdx)
- [Cyclical Dependencies workflow example](../../examples/workflows/cyclical-dependencies.mdx)
- [Suspend and Resume workflow example](../../examples/workflows/suspend-and-resume.mdx)
